Skip to content

fix(qqofficial): resilient websocket reconnect and send retry#9063

Open
piexian wants to merge 2 commits into
AstrBotDevs:masterfrom
piexian:fix/qqofficial-ws-resilience
Open

fix(qqofficial): resilient websocket reconnect and send retry#9063
piexian wants to merge 2 commits into
AstrBotDevs:masterfrom
piexian:fix/qqofficial-ws-resilience

Conversation

@piexian

@piexian piexian commented Jun 28, 2026

Copy link
Copy Markdown
Contributor

QQ 官方机器人(qq_official)适配器在 WebSocket 连接稳定性与发送重试上存在若干缺陷,导致机器人长时间运行后容易出现掉线不恢复、被 QQ限流无法接收消息、以及回复消息时偶发发送失败的问题。
现状问题:

  1. botpy SDK 原生的心跳间隔为硬编码,未采用 QQ HELLO(op=10)实际下发的心跳周期,且缺少对op=7(服务器要求重连)、会话失效 close code(4006/4007/4009)的处理,断线后无法可靠恢复。
  2. 短时间内频繁断连/重连无退避,极易触发 QQ 网关限流,反而加剧掉线。
  3. 启动时若 gateway 元数据暂时不可用(如 get_ws_url 返回 None),直接崩溃,无重试。
  4. 被动回复时若引用的 msg_id 已过期,QQ 返回 ServerError: 回复消息msg_id已过期,消息发送中断且无兜底。
    Fixes [Bug]qqofficial(websocket)适配器在某些情况下发送消息失败后不会重试发送 #8977

Modifications / 改动点

核心文件:astrbot/core/platform/sources/qqofficial/

1. qqofficial_platform_adapter.py

  • 新增 ManagedBotWebSocket:接管 botpy 的 WebSocket 生命周期
    • 采用 QQ HELLO(op=10)实际下发的 heartbeat_interval 发送心跳,而非硬编码
    • 处理 op=7(RECONNECT):将 session 重新排队触发受控重连
    • 心跳失败时标记不可重连并触发重连流程
  • 新增重连退避机制(botClient):
    • schedule_reconnect_delay:指数退避(1/2/5/10/30/60s)
    • 快速断连检测:5 秒内断连 ≥3 次时,强制 60s 限速延迟,避免被 QQ 网关封禁
    • 会话失效 close code(4006/4007/4009):标记为不可恢复会话
    • 连接稳定后通过 reset_reconnect_backoff 重置退避计数
  • 新增启动韧性:
    • QQOfficialGatewayUnavailableError:gateway 元数据不可用时抛出
    • _bot_login:校验 gateway 元数据
    • _should_retry_startup_error / _restart_client / _sleep_until_retry_or_shutdown:启动失败按 5/10/30/60s 自动重试,全程尊重 shutdown 信号

2. qqofficial_message_event.py

  • _send_with_markdown_fallback:捕获 ServerError: 回复消息msg_id已过期,移除过期 msg_id 后重试主动发送

3. tests/test_qqofficial_group_message_create.py

  • gateway 元数据缺失时抛出可重试错误

  • HELLO 心跳间隔采用 QQ 下发值

  • rate-limit 触发 60s 重连延迟

  • op=7 重连时 session 正确重新排队

  • msg_id 过期时移除后重

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果


Checklist / 检查清单

  • 😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
    / 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。

  • 👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
    / 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”

  • 🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
    / 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txtpyproject.toml 文件相应位置。

  • 😮 My changes do not introduce malicious code.
    / 我的更改没有引入恶意代码。

Summary by Sourcery

Improve QQ official platform adapter resilience by adding controlled websocket reconnect logic, startup retry handling, and safer message send fallback when replies fail.

New Features:

  • Use gateway-provided heartbeat interval and handle reconnect/invalid-session opcodes in a managed websocket for QQ official bots.
  • Introduce startup retry loop for the QQ official adapter with rate-limit aware delays and graceful shutdown integration.

Bug Fixes:

  • Prevent crashes when QQ websocket gateway metadata is temporarily unavailable by treating it as a retryable startup error.
  • Avoid repeated send failures when replying to expired messages by falling back to proactive send without msg_id.
  • Reduce dropouts and unrecovered sessions by marking invalid or non-resumable websocket sessions and scheduling reconnects appropriately.
  • Mitigate gateway rate limiting by applying exponential backoff and longer delays after rapid disconnects or explicit rate-limit signals.

Tests:

  • Add unit tests covering gateway metadata absence, heartbeat interval usage from HELLO, rate-limit reconnect scheduling, server-requested reconnect handling, and reply fallback behavior when msg_id is expired.

- ManagedBotWebSocket: heartbeat (op10/7/11), reconnect backoff, rate-limit delay, session-invalid close codes (4006/4007/4009)
- retry startup when gateway metadata unavailable
- drop expired reply msg_id and retry proactive send
- add tests
@dosubot dosubot Bot added size:L This PR changes 100-499 lines, ignoring generated files. area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. labels Jun 28, 2026

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • In ManagedBotWebSocket, _last_heartbeat_ack_at is updated but never used to detect missed heartbeats or unhealthy connections; consider either removing it or using it to trigger a forced reconnect on prolonged missing ACKs.
  • The new QQOfficialGatewayUnavailableError is only raised when max_concurrency is missing, but other malformed or incomplete gateway metadata from get_ws_url will be accepted; consider tightening validation of the gateway response to avoid starting with unusable state.
  • QQOfficialPlatformAdapter.run has been changed to an async method, which may affect existing usage patterns that expect a synchronous run; if this adapter is invoked by framework code assuming a sync interface, consider providing a small synchronous wrapper or helper to preserve compatibility.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In ManagedBotWebSocket, `_last_heartbeat_ack_at` is updated but never used to detect missed heartbeats or unhealthy connections; consider either removing it or using it to trigger a forced reconnect on prolonged missing ACKs.
- The new `QQOfficialGatewayUnavailableError` is only raised when `max_concurrency` is missing, but other malformed or incomplete gateway metadata from `get_ws_url` will be accepted; consider tightening validation of the gateway response to avoid starting with unusable state.
- QQOfficialPlatformAdapter.run has been changed to an async method, which may affect existing usage patterns that expect a synchronous `run`; if this adapter is invoked by framework code assuming a sync interface, consider providing a small synchronous wrapper or helper to preserve compatibility.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="112-113" />
<code_context>
+    async def _is_system_event(self, message_event, ws):
+        event_op = message_event["op"]
+        if event_op == self.WS_HELLO:
+            interval_ms = (message_event.get("d") or {}).get("heartbeat_interval")
+            if isinstance(interval_ms, int | float) and interval_ms > 0:
+                self._heartbeat_interval_seconds = interval_ms / 1000
+                logger.info(f"[QQOfficial] Gateway heartbeat interval: {interval_ms}ms")
</code_context>
<issue_to_address>
**issue (bug_risk):** Using `int | float` in `isinstance` is invalid and will raise at runtime.

`isinstance(interval_ms, int | float)` will raise `TypeError` at runtime (`arg 2 must be a type or tuple of types`). Use `isinstance(interval_ms, (int, float))`, or validate/cast the value before use. As currently written, a HELLO with a numeric `heartbeat_interval` will cause `_is_system_event` to raise and may break gateway initialization.
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="195" />
<code_context>
         super().__init__(*args, **kwargs)
         self._shutting_down = False
         self._active_websockets: set[ManagedBotWebSocket] = set()
+        self._next_connect_at = 0.0
+        self._reconnect_attempts = 0
+        self._last_connect_at = 0.0
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the reconnect/backoff and startup retry policies into dedicated helpers so the main client and run loop only interact with a small, high-level API instead of managing scattered state and branching.

You can keep all the behavior but reduce cognitive load by extracting the policy/state into small helpers and collapsing duplicated logic.

### 1. WebSocket reconnect/backoff: extract a policy helper

Right now `botClient` mixes:

- rate-limit handling
- quick-disconnect detection
- exponential backoff
- timestamp tracking (`_next_connect_at`, `_last_connect_at`)

You can encapsulate this in a small object (or even a helper on `botClient`) that owns the state and exposes a simple API.

For example, move the state and logic into a dedicated `ReconnectPolicy`:

```python
class ReconnectPolicy:
    def __init__(self) -> None:
        self._next_connect_at = 0.0
        self._reconnect_attempts = 0
        self._last_connect_at = 0.0
        self._quick_disconnect_count = 0

    def mark_connected(self) -> None:
        self._last_connect_at = time.monotonic()
        self._reconnect_attempts = 0
        self._quick_disconnect_count = 0
        self._next_connect_at = 0.0

    def schedule(
        self,
        *,
        custom_delay: float | None = None,
        rate_limited: bool = False,
    ) -> float:
        delay = custom_delay
        if delay is None and rate_limited:
            delay = _RATE_LIMIT_RECONNECT_DELAY_SECONDS

        if delay is None and self._last_connect_at:
            duration = time.monotonic() - self._last_connect_at
            if duration < _QUICK_DISCONNECT_THRESHOLD_SECONDS:
                self._quick_disconnect_count += 1
            else:
                self._quick_disconnect_count = 0
            if self._quick_disconnect_count >= _MAX_QUICK_DISCONNECTS:
                delay = _RATE_LIMIT_RECONNECT_DELAY_SECONDS
                self._quick_disconnect_count = 0
                logger.warning(
                    "[QQOfficial] Too many quick disconnects; delaying reconnect."
                )

        if delay is None:
            idx = min(
                self._reconnect_attempts,
                len(_RECONNECT_DELAYS_SECONDS) - 1,
            )
            delay = _RECONNECT_DELAYS_SECONDS[idx]
            self._reconnect_attempts += 1

        self._next_connect_at = max(self._next_connect_at, time.monotonic() + delay)
        return delay

    async def wait(self) -> None:
        delay = self._next_connect_at - time.monotonic()
        if delay > 0:
            logger.info(f"[QQOfficial] Waiting {delay:.1f}s before reconnect.")
            await asyncio.sleep(delay)
```

Then `botClient` only forwards:

```python
class botClient(Client):
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self._reconnect_policy = ReconnectPolicy()

    def mark_websocket_connected(self) -> None:
        self._reconnect_policy.mark_connected()

    def reset_reconnect_backoff(self) -> None:
        self._reconnect_policy.mark_connected()

    def schedule_reconnect_delay(
        self,
        reason: str,
        *,
        custom_delay: float | None = None,
        rate_limited: bool = False,
    ) -> None:
        delay = self._reconnect_policy.schedule(
            custom_delay=custom_delay,
            rate_limited=rate_limited,
        )
        logger.info(
            f"[QQOfficial] Reconnect scheduled in {delay}s, reason: {reason}"
        )

    async def wait_for_reconnect_delay(self) -> None:
        await self._reconnect_policy.wait()
```

This keeps all existing behavior, but the state is localized and the API from the rest of the code becomes:

- `mark_websocket_connected()`
- `reset_reconnect_backoff()`
- `schedule_reconnect_delay(...)`
- `wait_for_reconnect_delay()`

without `_next_connect_at`, `_reconnect_attempts`, `_quick_disconnect_count` being manipulated directly.

If you prefer not to introduce a class, you can keep the methods on `botClient` but still extract the “quick disconnect” detection into a helper to reduce branching:

```python
def _update_quick_disconnect(self) -> bool:
    if not self._last_connect_at:
        return False
    duration = time.monotonic() - self._last_connect_at
    if duration < _QUICK_DISCONNECT_THRESHOLD_SECONDS:
        self._quick_disconnect_count += 1
    else:
        self._quick_disconnect_count = 0
    return self._quick_disconnect_count >= _MAX_QUICK_DISCONNECTS
```

and then in `schedule_reconnect_delay`:

```python
if delay is None and self._update_quick_disconnect():
    delay = _RATE_LIMIT_RECONNECT_DELAY_SECONDS
    self._quick_disconnect_count = 0
    logger.warning(
        "[QQOfficial] Too many quick disconnects; delaying reconnect."
    )
```

### 2. Startup retry logic: unify error policy + delay

`_should_retry_startup_error` and `_next_startup_retry_delay` both need to know:

- whether the error is retryable
- whether it’s rate-limit-related (string markers `"100017"`, `"频率限制"`, `"Too many requests"`)

You can remove duplication by collapsing them into a single function that returns a delay or `None`:

```python
def _get_startup_retry_delay(self, error: Exception) -> float | None:
    # non-retryable
    if not isinstance(
        error,
        (
            asyncio.TimeoutError,
            ConnectionError,
            OSError,
            QQOfficialGatewayUnavailableError,
            botpy.errors.ServerError,
        ),
    ):
        return None

    delay: float | None = None
    if isinstance(error, botpy.errors.ServerError):
        error_msg = str(error)
        if any(marker in error_msg for marker in ("100017", "频率限制", "Too many requests")):
            delay = _RATE_LIMIT_RECONNECT_DELAY_SECONDS

    if delay is None:
        idx = min(
            self._startup_retry_attempts,
            len(self.STARTUP_RETRY_DELAYS_SECONDS) - 1,
        )
        delay = self.STARTUP_RETRY_DELAYS_SECONDS[idx]
    self._startup_retry_attempts += 1
    return delay
```

Then `run` can be simplified:

```python
async def run(self) -> None:
    while not self._shutdown_event.is_set():
        try:
            await self.client.start(appid=self.appid, secret=self.secret)
            self._startup_retry_attempts = 0
            if self._shutdown_event.is_set():
                break

            logger.warning(
                f"[QQOfficial] Client stopped unexpectedly, restarting in "
                f"{self.STARTUP_RETRY_DELAYS_SECONDS[0]}s."
            )
            await self._restart_client()
            if not await self._sleep_until_retry_or_shutdown(
                self.STARTUP_RETRY_DELAYS_SECONDS[0]
            ):
                break

        except asyncio.CancelledError:
            raise
        except Exception as e:
            if self._shutdown_event.is_set():
                break

            delay = self._get_startup_retry_delay(e)
            if delay is None:
                raise

            logger.warning(
                f"[QQOfficial] Startup failed, retrying in {delay}s: {e}"
            )
            await self._restart_client()
            if not await self._sleep_until_retry_or_shutdown(delay):
                break
```

This keeps:

- same retry classification
- same rate-limit-specific behavior
- same backoff sequence

while removing the duplicated string checks and reducing `run`’s branching.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +112 to +113
interval_ms = (message_event.get("d") or {}).get("heartbeat_interval")
if isinstance(interval_ms, int | float) and interval_ms > 0:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Using int | float in isinstance is invalid and will raise at runtime.

isinstance(interval_ms, int | float) will raise TypeError at runtime (arg 2 must be a type or tuple of types). Use isinstance(interval_ms, (int, float)), or validate/cast the value before use. As currently written, a HELLO with a numeric heartbeat_interval will cause _is_system_event to raise and may break gateway initialization.

super().__init__(*args, **kwargs)
self._shutting_down = False
self._active_websockets: set[ManagedBotWebSocket] = set()
self._next_connect_at = 0.0

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the reconnect/backoff and startup retry policies into dedicated helpers so the main client and run loop only interact with a small, high-level API instead of managing scattered state and branching.

You can keep all the behavior but reduce cognitive load by extracting the policy/state into small helpers and collapsing duplicated logic.

1. WebSocket reconnect/backoff: extract a policy helper

Right now botClient mixes:

  • rate-limit handling
  • quick-disconnect detection
  • exponential backoff
  • timestamp tracking (_next_connect_at, _last_connect_at)

You can encapsulate this in a small object (or even a helper on botClient) that owns the state and exposes a simple API.

For example, move the state and logic into a dedicated ReconnectPolicy:

class ReconnectPolicy:
    def __init__(self) -> None:
        self._next_connect_at = 0.0
        self._reconnect_attempts = 0
        self._last_connect_at = 0.0
        self._quick_disconnect_count = 0

    def mark_connected(self) -> None:
        self._last_connect_at = time.monotonic()
        self._reconnect_attempts = 0
        self._quick_disconnect_count = 0
        self._next_connect_at = 0.0

    def schedule(
        self,
        *,
        custom_delay: float | None = None,
        rate_limited: bool = False,
    ) -> float:
        delay = custom_delay
        if delay is None and rate_limited:
            delay = _RATE_LIMIT_RECONNECT_DELAY_SECONDS

        if delay is None and self._last_connect_at:
            duration = time.monotonic() - self._last_connect_at
            if duration < _QUICK_DISCONNECT_THRESHOLD_SECONDS:
                self._quick_disconnect_count += 1
            else:
                self._quick_disconnect_count = 0
            if self._quick_disconnect_count >= _MAX_QUICK_DISCONNECTS:
                delay = _RATE_LIMIT_RECONNECT_DELAY_SECONDS
                self._quick_disconnect_count = 0
                logger.warning(
                    "[QQOfficial] Too many quick disconnects; delaying reconnect."
                )

        if delay is None:
            idx = min(
                self._reconnect_attempts,
                len(_RECONNECT_DELAYS_SECONDS) - 1,
            )
            delay = _RECONNECT_DELAYS_SECONDS[idx]
            self._reconnect_attempts += 1

        self._next_connect_at = max(self._next_connect_at, time.monotonic() + delay)
        return delay

    async def wait(self) -> None:
        delay = self._next_connect_at - time.monotonic()
        if delay > 0:
            logger.info(f"[QQOfficial] Waiting {delay:.1f}s before reconnect.")
            await asyncio.sleep(delay)

Then botClient only forwards:

class botClient(Client):
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self._reconnect_policy = ReconnectPolicy()

    def mark_websocket_connected(self) -> None:
        self._reconnect_policy.mark_connected()

    def reset_reconnect_backoff(self) -> None:
        self._reconnect_policy.mark_connected()

    def schedule_reconnect_delay(
        self,
        reason: str,
        *,
        custom_delay: float | None = None,
        rate_limited: bool = False,
    ) -> None:
        delay = self._reconnect_policy.schedule(
            custom_delay=custom_delay,
            rate_limited=rate_limited,
        )
        logger.info(
            f"[QQOfficial] Reconnect scheduled in {delay}s, reason: {reason}"
        )

    async def wait_for_reconnect_delay(self) -> None:
        await self._reconnect_policy.wait()

This keeps all existing behavior, but the state is localized and the API from the rest of the code becomes:

  • mark_websocket_connected()
  • reset_reconnect_backoff()
  • schedule_reconnect_delay(...)
  • wait_for_reconnect_delay()

without _next_connect_at, _reconnect_attempts, _quick_disconnect_count being manipulated directly.

If you prefer not to introduce a class, you can keep the methods on botClient but still extract the “quick disconnect” detection into a helper to reduce branching:

def _update_quick_disconnect(self) -> bool:
    if not self._last_connect_at:
        return False
    duration = time.monotonic() - self._last_connect_at
    if duration < _QUICK_DISCONNECT_THRESHOLD_SECONDS:
        self._quick_disconnect_count += 1
    else:
        self._quick_disconnect_count = 0
    return self._quick_disconnect_count >= _MAX_QUICK_DISCONNECTS

and then in schedule_reconnect_delay:

if delay is None and self._update_quick_disconnect():
    delay = _RATE_LIMIT_RECONNECT_DELAY_SECONDS
    self._quick_disconnect_count = 0
    logger.warning(
        "[QQOfficial] Too many quick disconnects; delaying reconnect."
    )

2. Startup retry logic: unify error policy + delay

_should_retry_startup_error and _next_startup_retry_delay both need to know:

  • whether the error is retryable
  • whether it’s rate-limit-related (string markers "100017", "频率限制", "Too many requests")

You can remove duplication by collapsing them into a single function that returns a delay or None:

def _get_startup_retry_delay(self, error: Exception) -> float | None:
    # non-retryable
    if not isinstance(
        error,
        (
            asyncio.TimeoutError,
            ConnectionError,
            OSError,
            QQOfficialGatewayUnavailableError,
            botpy.errors.ServerError,
        ),
    ):
        return None

    delay: float | None = None
    if isinstance(error, botpy.errors.ServerError):
        error_msg = str(error)
        if any(marker in error_msg for marker in ("100017", "频率限制", "Too many requests")):
            delay = _RATE_LIMIT_RECONNECT_DELAY_SECONDS

    if delay is None:
        idx = min(
            self._startup_retry_attempts,
            len(self.STARTUP_RETRY_DELAYS_SECONDS) - 1,
        )
        delay = self.STARTUP_RETRY_DELAYS_SECONDS[idx]
    self._startup_retry_attempts += 1
    return delay

Then run can be simplified:

async def run(self) -> None:
    while not self._shutdown_event.is_set():
        try:
            await self.client.start(appid=self.appid, secret=self.secret)
            self._startup_retry_attempts = 0
            if self._shutdown_event.is_set():
                break

            logger.warning(
                f"[QQOfficial] Client stopped unexpectedly, restarting in "
                f"{self.STARTUP_RETRY_DELAYS_SECONDS[0]}s."
            )
            await self._restart_client()
            if not await self._sleep_until_retry_or_shutdown(
                self.STARTUP_RETRY_DELAYS_SECONDS[0]
            ):
                break

        except asyncio.CancelledError:
            raise
        except Exception as e:
            if self._shutdown_event.is_set():
                break

            delay = self._get_startup_retry_delay(e)
            if delay is None:
                raise

            logger.warning(
                f"[QQOfficial] Startup failed, retrying in {delay}s: {e}"
            )
            await self._restart_client()
            if not await self._sleep_until_retry_or_shutdown(delay):
                break

This keeps:

  • same retry classification
  • same rate-limit-specific behavior
  • same backoff sequence

while removing the duplicated string checks and reducing run’s branching.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request enhances the QQ Official API platform adapter with robust connection management, including custom heartbeat intervals, exponential backoff reconnect delays, and automatic startup retries for transient errors. The review feedback highlights two important improvements: first, a potential bug where a session could be added to the connection queue twice during reconnects or invalid sessions, which can be fixed by setting _can_reconnect to False before closing the WebSocket; second, a recommendation to treat all botpy.errors.ServerError exceptions as retryable during startup rather than filtering by specific error messages.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +120 to +137
if event_op == self.WS_RECONNECT:
logger.info("[QQOfficial] Gateway requested reconnect.")
self._client.schedule_reconnect_delay("server requested reconnect")
self._connection.add(self._session)
await ws.close()
return True
if event_op == self.WS_INVALID_SESSION:
can_resume = bool(message_event.get("d"))
logger.warning(
f"[QQOfficial] Gateway reported invalid session, can_resume={can_resume}."
)
if not can_resume:
self._session["session_id"] = ""
self._session["last_seq"] = 0
self._client.schedule_reconnect_delay("invalid session", custom_delay=3)
self._connection.add(self._session)
await ws.close()
return True

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

在处理 WS_RECONNECTWS_INVALID_SESSION 时,代码中手动调用了 self._connection.add(self._session),然后调用了 await ws.close()

然而,ws.close() 会触发 on_closed 回调,而 on_closed 会调用 super().on_closed。在 botpy 的实现中,如果 self._can_reconnectTruesuper().on_closed再次调用 self._connection.add(self._session)

这会导致同一个 session 被重复放入连接队列中,从而启动两个并发的 WebSocket 连接。这会引发连接冲突、频繁断连和限流问题。

建议在调用 ws.close() 之前,将 self._can_reconnect 设置为 False,以防止 on_closed 重复将 session 放入队列。

Suggested change
if event_op == self.WS_RECONNECT:
logger.info("[QQOfficial] Gateway requested reconnect.")
self._client.schedule_reconnect_delay("server requested reconnect")
self._connection.add(self._session)
await ws.close()
return True
if event_op == self.WS_INVALID_SESSION:
can_resume = bool(message_event.get("d"))
logger.warning(
f"[QQOfficial] Gateway reported invalid session, can_resume={can_resume}."
)
if not can_resume:
self._session["session_id"] = ""
self._session["last_seq"] = 0
self._client.schedule_reconnect_delay("invalid session", custom_delay=3)
self._connection.add(self._session)
await ws.close()
return True
if event_op == self.WS_RECONNECT:
logger.info("[QQOfficial] Gateway requested reconnect.")
self._client.schedule_reconnect_delay("server requested reconnect")
self._connection.add(self._session)
self._can_reconnect = False
await ws.close()
return True
if event_op == self.WS_INVALID_SESSION:
can_resume = bool(message_event.get("d"))
logger.warning(
f"[QQOfficial] Gateway reported invalid session, can_resume={can_resume}."
)
if not can_resume:
self._session["session_id"] = ""
self._session["last_seq"] = 0
self._client.schedule_reconnect_delay("invalid session", custom_delay=3)
self._connection.add(self._session)
self._can_reconnect = False
await ws.close()
return True

Comment on lines +439 to +456
def _should_retry_startup_error(error: Exception) -> bool:
if isinstance(
error,
(
asyncio.TimeoutError,
ConnectionError,
OSError,
QQOfficialGatewayUnavailableError,
),
):
return True
if isinstance(error, botpy.errors.ServerError):
error_msg = str(error)
return any(
marker in error_msg
for marker in ("100017", "频率限制", "Too many requests")
)
return False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

_should_retry_startup_error 中,目前只有当 botpy.errors.ServerError 包含 "100017""频率限制""Too many requests" 时才会返回 True

然而,ServerError 代表的是 QQ 服务器端的 5xx 错误(如 500 Internal Server Error、502 Bad Gateway、503 Service Unavailable 等)。这些错误本质上都是暂时的、可重试的。如果 QQ 接口在启动时由于临时故障返回 502,机器人应该重试而不是直接崩溃。

建议将所有的 botpy.errors.ServerError 都视为可重试的启动错误。

    def _should_retry_startup_error(error: Exception) -> bool:
        if isinstance(
            error,
            (
                asyncio.TimeoutError,
                ConnectionError,
                OSError,
                QQOfficialGatewayUnavailableError,
                botpy.errors.ServerError,
            ),
        ):
            return True
        return False

- WS_RECONNECT/WS_INVALID_SESSION: set _can_reconnect=False before ws.close() so botpy on_closed no longer re-adds the session (review: duplicate enqueue caused concurrent websockets)
- mark rate_limit reconnect test async so botpy Client.__init__ finds a running event loop (fixes CI RuntimeError)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]qqofficial(websocket)适配器在某些情况下发送消息失败后不会重试发送

1 participant